package com.bihua.wx.service.impl;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.bihua.common.core.domain.PageQuery;
import com.bihua.common.core.page.TableDataInfo;
import com.bihua.common.utils.StringUtils;
import com.bihua.wx.domain.WxUser;
import com.bihua.wx.domain.bo.WxUserBo;
import com.bihua.wx.domain.vo.WxUserVo;
import com.bihua.wx.mapper.WxUserMapper;
import com.bihua.wx.service.IWxUserService;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.error.WxErrorException;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.api.WxMpUserService;
import me.chanjar.weixin.mp.bean.result.WxMpUser;
import me.chanjar.weixin.mp.bean.result.WxMpUserList;
import me.chanjar.weixin.mp.bean.tag.WxUserTag;

/**
 * 用户Service业务层处理
 *
 * @author ruoyi
 * @date 2023-05-21
 */
@RequiredArgsConstructor
@Service
@Slf4j
@CacheConfig(cacheNames = {"wxUserTagsServiceCache"})
public class WxUserServiceImpl implements IWxUserService {

    private final WxUserMapper baseMapper;

    private final WxMpService wxService;

    private volatile static  boolean syncWxUserTaskRunning=false;
    public static final String CACHE_KEY="'WX_USER_TAGS'";

    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    /**
     * 查询用户
     */
    @Override
    public WxUserVo queryById(String openid){
        return baseMapper.selectVoById(openid);
    }

    @Override
    public List<WxUserVo> listByIds(List<String> openids) {
        return baseMapper.selectVoBatchIds(openids);
    }

    /**
     * 查询用户列表
     */
    @Override
    public TableDataInfo<WxUserVo> queryPageList(WxUserBo bo, PageQuery pageQuery) {
        LambdaQueryWrapper<WxUser> lqw = buildQueryWrapper(bo);
        Page<WxUserVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
        return TableDataInfo.build(result);
    }

    /**
     * 查询用户列表
     */
    @Override
    public List<WxUserVo> queryList(WxUserBo bo) {
        LambdaQueryWrapper<WxUser> lqw = buildQueryWrapper(bo);
        return baseMapper.selectVoList(lqw);
    }

    private LambdaQueryWrapper<WxUser> buildQueryWrapper(WxUserBo bo) {
        Map<String, Object> params = bo.getParams();
        LambdaQueryWrapper<WxUser> lqw = Wrappers.lambdaQuery();
        lqw.eq(StringUtils.isNotBlank(bo.getAppid()), WxUser::getAppid, bo.getAppid());
        lqw.eq(StringUtils.isNotBlank(bo.getPhone()), WxUser::getPhone, bo.getPhone());
        lqw.like(StringUtils.isNotBlank(bo.getNickname()), WxUser::getNickname, bo.getNickname());
        lqw.eq(bo.getSex() != null, WxUser::getSex, bo.getSex());
        lqw.eq(StringUtils.isNotBlank(bo.getCity()), WxUser::getCity, bo.getCity());
        lqw.eq(StringUtils.isNotBlank(bo.getProvince()), WxUser::getProvince, bo.getProvince());
        lqw.eq(StringUtils.isNotBlank(bo.getHeadimgurl()), WxUser::getHeadimgurl, bo.getHeadimgurl());
        lqw.eq(bo.getSubscribeTime() != null, WxUser::getSubscribeTime, bo.getSubscribeTime());
        lqw.eq(bo.getSubscribe() != null, WxUser::getSubscribe, bo.getSubscribe());
        lqw.eq(StringUtils.isNotBlank(bo.getUnionid()), WxUser::getUnionid, bo.getUnionid());
        if(CollectionUtils.isNotEmpty(bo.getTagidList())){
            lqw.and(new Consumer<LambdaQueryWrapper<WxUser>>() {
                @Override
                public void accept(LambdaQueryWrapper<WxUser> wxUserLambdaQueryWrapper) {
                    boolean firstFlag = true;
                    for (long tagid : bo.getTagidList()){
                        if(firstFlag){
                            firstFlag = false;
                        }else {
                            wxUserLambdaQueryWrapper.or();
                        }
                        wxUserLambdaQueryWrapper.apply(tagid != 0L, "JSON_CONTAINS(tagid_list,{0})", String.valueOf(tagid));
                    }
                }
            });
        }
        lqw.eq(StringUtils.isNotBlank(bo.getSubscribeScene()), WxUser::getSubscribeScene, bo.getSubscribeScene());
        lqw.eq(StringUtils.isNotBlank(bo.getQrSceneStr()), WxUser::getQrSceneStr, bo.getQrSceneStr());
        return lqw;
    }

    /**
     * 批量删除用户
     */
    @Override
    public Boolean deleteWithValidByIds(Collection<String> ids, Boolean isValid) {
        if(isValid){
            //TODO 做一些业务上的校验,判断是否需要校验
        }
        return baseMapper.deleteBatchIds(ids) > 0;
    }

    /**
     * 同步用户列表,公众号一次拉取调用最多拉取10000个关注者的OpenID，可以通过传入nextOpenid参数多次拉取
     */
    @Override
    @Async
    public void syncWxUsers(String appid) {
        //同步较慢，防止个多线程重复执行同步任务
        Assert.isTrue(!syncWxUserTaskRunning,"后台有同步任务正在进行中，请稍后重试");
        wxService.switchoverTo(appid);
        syncWxUserTaskRunning=true;
        log.info("同步公众号粉丝列表：任务开始");
        wxService.switchover(appid);
        boolean hasMore=true;
        String nextOpenid=null;
        WxMpUserService wxMpUserService = wxService.getUserService();
        try {
            int page=1;
            while (hasMore){
                WxMpUserList wxMpUserList = wxMpUserService.userList(nextOpenid);//拉取openid列表，每次最多1万个
                log.info("拉取openid列表：第{}页，数量：{}",page++,wxMpUserList.getCount());
                List<String> openids = wxMpUserList.getOpenids();
                this.syncWxUsers(openids,appid);
                nextOpenid=wxMpUserList.getNextOpenid();
                hasMore= org.springframework.util.StringUtils.hasText(nextOpenid) && wxMpUserList.getCount()>=10000;
            }
        } catch (WxErrorException e) {
            log.error("同步公众号粉丝出错:",e);
        }finally {
            syncWxUserTaskRunning=false;
        }
        log.info("同步公众号粉丝列表：完成");
    }

    /**
     * 通过传入的openid列表，同步用户列表
     * @param openids
     */
    @Override
    public void syncWxUsers(List<String> openids,String appid) {
        if(openids.size()<1) {
            return;
        }
        final String batch=openids.get(0).substring(20);//截取首个openid的一部分做批次号（打印日志时使用，无实际意义）
        WxMpUserService wxMpUserService = wxService.getUserService();
        int start=0,batchSize=openids.size(),end=Math.min(100,batchSize);
        log.info("开始处理批次：{}，批次数量：{}",batch,batchSize);
        while (start<end && end<=batchSize){//分批处理,每次最多拉取100个用户信息
            final int finalStart = start,finalEnd = end;
            final List<String> subOpenids=openids.subList(finalStart,finalEnd);
            threadPoolTaskExecutor.submit(()->{//使用线程池同步数据，否则大量粉丝数据需同步时会很慢
                log.info("同步批次:【{}--{}-{}】，数量：{}",batch, finalStart, finalEnd,subOpenids.size());
                wxService.switchover(appid);
                List<WxMpUser> wxMpUsers = null;//批量获取用户信息，每次最多100个
                try {
                    wxMpUsers = wxMpUserService.userInfoList(subOpenids);
                } catch (WxErrorException e) {
                    log.error("同步出错，批次：【{}--{}-{}】，错误信息：{}",batch, finalStart, finalEnd,e);
                }
                if(wxMpUsers!=null && !wxMpUsers.isEmpty()){
                    List<WxUser> wxUsers=wxMpUsers.parallelStream().map(item->new WxUser(item,appid)).collect(Collectors.toList());
                    baseMapper.insertOrUpdateBatch(wxUsers);
                }
            });
            start=end;
            end=Math.min(end+100,openids.size());
        }
        log.info("批次：{}处理完成",batch);
    }
    /**
     * 根据openid更新用户信息
     *
     * @param openid
     * @return
     */
    @Override
    public WxUser refreshUserInfo(String openid,String appid) {
        try {
            // 获取微信用户基本信息
            log.info("更新用户信息，openid={}",openid);
            wxService.switchover(appid);
            WxMpUser userWxInfo = wxService.getUserService().userInfo(openid, null);
            if (userWxInfo == null) {
                log.error("获取不到用户信息，无法更新,openid:{}",openid);
                return null;
            }
            WxUser user = new WxUser(userWxInfo,appid);
            baseMapper.insertOrUpdate(user);
            return user;
        } catch (Exception e) {
            log.error("更新用户信息失败,openid:{}",openid);
        }
        return null;
    }
    /**
     * 异步批量同步用户信息
     * @param openidList
     */
    @Override
    @Async
    public void refreshUserInfoAsync(String[] openidList,String appid) {
        log.info("批量更新用户信息：任务开始");
        for(String openid:openidList){
            wxService.switchover(appid);
            threadPoolTaskExecutor.submit(()->this.refreshUserInfo(openid,appid));
        }
        log.info("批量更新用户信息：任务全部添加到线程池");
    }
    @Override
    public void unsubscribe(String openid) {
        baseMapper.unsubscribe(openid);
    }

    @Override
    @Cacheable(key = CACHE_KEY+"+ #appid")
    public List<WxUserTag> getWxTags(String appid) throws WxErrorException {
        log.info("拉取公众号用户标签");
        wxService.switchoverTo(appid);
        return wxService.getUserTagService().tagGet();
    }

    @Override
    @CacheEvict(key = CACHE_KEY+"+ #appid")
    public void creatTag(String appid, String name) throws WxErrorException {
        wxService.switchoverTo(appid);
        wxService.getUserTagService().tagCreate(name);
    }

    @Override
    @CacheEvict(key = CACHE_KEY+"+ #appid")
    public void updateTag(String appid, Long tagid, String name) throws WxErrorException {
        wxService.switchoverTo(appid);
        wxService.getUserTagService().tagUpdate(tagid,name);
    }
    @Override
    @CacheEvict(key = CACHE_KEY+"+ #appid")
    public void deleteTag(String appid, Long tagid) throws WxErrorException {
        wxService.switchoverTo(appid);
        wxService.getUserTagService().tagDelete(tagid);
    }

    @Override
    public void batchTagging(String appid, Long tagid, String[] openidList) throws WxErrorException {
        wxService.switchoverTo(appid);
        wxService.getUserTagService().batchTagging(tagid,openidList);
        refreshUserInfoAsync(openidList,appid);//标签更新后更新对应用户信息
    }

    @Override
    public void batchUnTagging(String appid, Long tagid, String[] openidList) throws WxErrorException {
        wxService.switchoverTo(appid);
        wxService.getUserTagService().batchUntagging(tagid,openidList);
        refreshUserInfoAsync(openidList,appid);//标签更新后更新对应用户信息
    }
}
